package drds.data_propagate.parse.binlog_event_position_manager;

import drds.data_propagate.common.utils.JsonUtils;
import drds.data_propagate.common.zookeeper.ZkClientx;
import drds.data_propagate.common.zookeeper.ZookeeperPathUtils;
import drds.data_propagate.entry.position.BinLogEventPosition;
import drds.data_propagate.parse.exception.ParseException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;


public class ZooKeeperBinLogEventPositionManager extends AbstractBinLogEventPositionManager {

    private final ZkClientx zkClientx;

    public ZooKeeperBinLogEventPositionManager(ZkClientx zkClient) {
        if (zkClient == null) {
            throw new NullPointerException("null zkClient");
        }

        this.zkClientx = zkClient;
    }

    @Override
    public BinLogEventPosition getLatestBinLogEventPosition(String taskId) {
        String path = ZookeeperPathUtils.getParsePath(taskId);
        byte[] data = zkClientx.readData(path, true);
        if (data == null || data.length == 0) {
            return null;
        }

        return JsonUtils.unmarshalFromByte(data, BinLogEventPosition.class);
    }

    @Override
    public void persistBinLogEventPosition(String taskId, BinLogEventPosition binLogEventPosition) throws ParseException {
        String path = ZookeeperPathUtils.getParsePath(taskId);
        byte[] data = JsonUtils.marshalToByte(binLogEventPosition);
        try {
            zkClientx.writeData(path, data);
        } catch (ZkNoNodeException e) {
            zkClientx.createPersistent(path, data, true);
        }
    }

}
